package com.heima.schedule.service;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.heima.common.cache.CacheService;
import com.heima.model.schedule.dto.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;

import java.util.Date;
import java.util.List;
import java.util.Set;

/**
 * @author lzy
 * @date 2022/7/19 18:11
 */
@Slf4j
@Service
public class TaskServiceImpl implements TaskService {

    @Autowired
    private TaskinfoService taskinfoService;
    @Autowired
    private TaskinfoLogsService taskinfoLogsService;
    @Autowired
    private CacheService cacheService;

    @Override
    @Transactional(rollbackFor = RuntimeException.class)
    public Long addTask(Task task) {
        //判空
        if (task == null){
            log.warn("入参不能为空");
            return null;
        }
        //插入taskinfo(任务)表
        Taskinfo taskInfo = new Taskinfo();
        BeanUtils.copyProperties(task,taskInfo);
        taskInfo.setExecuteTime(new Date(task.getExecuteTime()));
        boolean taskInfoResult = taskinfoService.save(taskInfo);
        if (!taskInfoResult){
            log.warn("任务插入失败");
            return null;
        }
        //插入taskinfolog表
        TaskinfoLogs taskInfoLogs = new TaskinfoLogs();
        BeanUtils.copyProperties(task,taskInfoLogs);
        taskInfoLogs.setExecuteTime(new Date(task.getExecuteTime()));
        taskInfoLogs.setStatus(0);

        boolean taskInfoLogsResult = taskinfoLogsService.save(taskInfoLogs);
        if (!taskInfoLogsResult){
            log.warn("日志插入失败");
            throw new RuntimeException("日志插入失败");
        }
        //判断任务执行时间是否小于当前时间
        //任务执行时间
        long executeTime = task.getExecuteTime();
        //当前时间
        long currentTime = System.currentTimeMillis();
        if (executeTime < currentTime){
            //加入到当前执行队列
            cacheService.lLeftPush("TOPIC", JSON.toJSONString(task));
        }
        //判断任务执行时间是否小于未来五分钟
        long futureTime = currentTime + (5 * 60 *1000);
        if (executeTime <= futureTime){
            //加入到未来执行队列
            cacheService.lLeftPush("TOPIC", JSON.toJSONString(task));
        }
        //返回任务id
        return taskInfo.getTaskId();
    }


    @Override
    @Transactional(rollbackFor = RuntimeException.class)
    public Boolean cancelTask(Long taskId) {
        if (taskId == null){
            log.warn("入参不能为空");
            return false;
        }
        //1.删除任务
        boolean removeResult = taskinfoService.removeById(taskId);
        if (!removeResult){
            log.warn("任务删除失败");
            return false;
        }

        //2.更新taskinfolog
        LambdaQueryWrapper<TaskinfoLogs> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(TaskinfoLogs::getTaskId,taskId);
        TaskinfoLogs logs = taskinfoLogsService.getOne(wrapper);
        logs.setStatus(2);
        boolean logsResult = taskinfoLogsService.updateById(logs);
        if (!logsResult){
            log.warn("日志更新失败");
            throw new RuntimeException("日志更新失败");
        }
        //获取task对象
        Task task = new Task();
        BeanUtils.copyProperties(logs,task);
        task.setExecuteTime(logs.getExecuteTime().getTime());
        //删除redis里面的list和zset结构数据
        cacheService.lRemove("TOPIC",0,JSON.toJSONString(task));
        cacheService.zRemove("FUTURE",JSON.toJSONString(task));
        return true;
    }

    @Override
    @Transactional(rollbackFor = RuntimeException.class)
    public Boolean pollTask() {

        //1.弹出数据
        String taskString = cacheService.lRightPop("TOPIC");
        if (StringUtils.isEmpty(taskString)){
            log.warn("没有可执行任务");
            return false;
        }
        //封装task对象
        Task task = JSON.parseObject(taskString,Task.class);
        if (task == null){
            log.warn("没有可执行任务");
            return false;
        }
        boolean taskInfoResult = taskinfoService.removeById(task.getTaskId());
        if (!taskInfoResult){
            log.warn("删除任务失败");
            return false;
        }
        long executeTime = task.getExecuteTime();
        TaskinfoLogs logs = new TaskinfoLogs();
        BeanUtils.copyProperties(task,logs);
        logs.setExecuteTime(new Date(executeTime));
        logs.setStatus(1);

        boolean logsResult = taskinfoLogsService.updateById(logs);
        if (!logsResult){
            log.warn("日志更新失败");
            throw new RuntimeException("日志更新失败");
        }
        return true;
    }

    @Scheduled(fixedRate = 1000 * 60 * 5)
    public void refresh(){
        log.info("我被执行了");
        String lockToken = cacheService.tryLock("refresh", 5000);
        if (StringUtils.isEmpty(lockToken)){
            return;
        }
        Set<String> future = cacheService.zRangeByScore("FUTURE",0,System.currentTimeMillis());
        if (CollectionUtils.isEmpty(future)){
            log.warn("没有可移动的任务");
            return;
        }
        cacheService.refreshWithPipeline("FUTURE","TOPIC",future);
    }
    @Scheduled(fixedRate = 5 * 60 * 1000)
    public void initData(){
        //清除缓存
        clear();
        List<Taskinfo> list = taskinfoService.list();
        if (CollectionUtils.isEmpty(list)){
            log.warn("任务为空");
            return;
        }

        for (Taskinfo taskinfo : list) {
            if (taskinfo == null){
                continue;
            }

            long executeTime = taskinfo.getExecuteTime().getTime();
            long currentTime = System.currentTimeMillis();
            long futureTime = currentTime + (5 * 60 * 1000);

            Task task = new Task();
            BeanUtils.copyProperties(taskinfo,task);
            task.setExecuteTime(executeTime);
            if (executeTime <= currentTime){
                cacheService.lLeftPush("TOPIC",JSON.toJSONString(task));
            }else if (executeTime <= futureTime){
                cacheService.zAdd("FUTURE",JSON.toJSONString(task),executeTime);
            }
        }
    }

    public void clear(){
        cacheService.delete("TOPIC");
        cacheService.delete("FUTURE");
    }
}
